Jar 作业开发

1 Maven 添加pom依赖:

      <dependency>
         <groupId>cn.tongdun.spark</groupId>
         <artifactId>jobapi</artifactId>
         <version>1.0.6-SNAPSHOT</version>
         <scope>provided</scope>
      </dependency>

2 实现接口 cn.tongdun.spark.jobapi.SparkJob:

class ScalaJobDemo extends SparkJob {

    /**
    * 该方法为作业的执行主入口
    */
    override def runJob(sparkSession: SparkSession, logger: SparkJobLogger, args: Array[String]): Unit = {
        logger.info("execute SparkJobDemo")
        val schemaString = "Id Name Age"
        val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))
        val schema = StructType(fields)

        logger.info("add hdfs://tdhdfs/user/tao.fu/testdata")
        val row = sparkSession.sparkContext.textFile("hdfs://tdhdfs/user/tao.fu/testdata")
        val rowRdd = row.map(_.split(",")).map(attr => Row(attr(0).trim, attr(1).trim, attr(2).trim))

        logger.info("create table tdl_spark_people")
        val peopleDf = sparkSession.createDataFrame(rowRdd, schema).createOrReplaceTempView("tdl_spark_people")

        logger.info("execute select * from tdl_spark_people")
        logger.info("Result: "+sparkSession.sql("select * from tdl_spark_people").collectAsList())

    }
}

3 继承 cn.tongdun.spark.jobapi.

AbstractSparkJob 实现 SparkJob接口,提供数据管道功能, 数据管道: 上游作业实例可以传递数据给下游作业实例,仅支持字符串数据,管道大小限制512k。

AbstractSparkJob 操作管道提供如下两个方法: setPipelineData 给管道设置数据, 提供给下游作业实例使用 getPipelineData 从管道获取上游作业实例传输的数据

4 部署应用

将自定义实现的类打成jar包,上传至dataocean平台,新建job作业类型为Spark jar,输入以下指令

  [set key=value;]
  jar包名 入口类名 自定义参数 [--jars jar1,jar2]
  可以输入多行命令,各行之间分号分割
   例如:jobdemo-0.1.0.jar cn.tongdun.spark.demo.SparkJobDemo param1 param2;
       jobdemo-0.1.0.jar cn.tongdun.spark.demo.SparkJobDemo param3 param4

任务的提交支持添加相关依赖,当目标jar包需要依赖相关包时,可以事先将依赖包上传至平台,在--jars中指定依赖包名称,以逗号分割 例如:

     set spark.sql.shuffle.partitions = 5;
     jobserverdemo-1.0-SNAPSHOT.jar cn.tongdun.spark.SparkJobDemo --jars flume-ng-sdk-1.8.0.jar(建议是绝对路径)
     flume-ng-sdk-1.8.0.jar需首先上传至dataocean平台

用户可以自定义spark的执行参数,在命令行之前加上 set key=value; 例如 set spark.sql.shuffle.partitions = 5;(可以设置多个参数,以分号间隔) unset 命令可以重置设定的参数 例如 unset spark.sql.shuffle.partitions (此时spark.sql.shuffle.partitions将被设定为系统默认值)

5 注意事项

  • 在runJob方法中不要捕获异常,直接将异常抛出,另外不要调用System.exit()等方法
  • 查看日志采用SparkJobLogger自带的方法,包括info、warn、error 3种级别
  • 创建临时表推荐使用createOrReplaceTempView 表名需包含至少3个字段,以_作分割,并且以tdl开头,例如tdl_tmp_activity

results matching ""

    No results matching ""